Skip to content

Spark 3.3: Improve task and job abort handling#6876

Merged
aokolnychyi merged 1 commit intoapache:masterfrom
aokolnychyi:improve-abort
Feb 23, 2023
Merged

Spark 3.3: Improve task and job abort handling#6876
aokolnychyi merged 1 commit intoapache:masterfrom
aokolnychyi:improve-abort

Conversation

@aokolnychyi
Copy link
Copy Markdown
Contributor

This PR improves our task and job abort handling in Spark 3.3.

  • This change leverages bulk deletes whenever possible.
  • This change adds helpful log messages that indicate how many files were deleted and task context if any.
[Executor task launch worker for task 0.0 in stage 0.0 (TID 0)] ERROR org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Aborting commit for partition 0 (task 0, attempt 0, stage 0.0)
[Executor task launch worker for task 0.0 in stage 0.0 (TID 0)] INFO org.apache.iceberg.spark.source.SparkCleanupUtil - Deleted 2 file(s) (partition 0 (task 0, attempt 0, stage 0.0))
...
[Test worker] ERROR org.apache.spark.sql.execution.datasources.v2.AppendDataExec - Data source write support IcebergBatchWrite(table=testhive.default.table, format=PARQUET) is aborting.
[Test worker] INFO org.apache.iceberg.spark.source.SparkCleanupUtil - Deleted 0 file(s) (job abort)
[Test worker] ERROR org.apache.spark.sql.execution.datasources.v2.AppendDataExec - Data source write support IcebergBatchWrite(table=testhive.default.table, format=PARQUET) aborted.
[Executor task launch worker for task 0.0 in stage 3.0 (TID 4)] ERROR org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Aborting commit for partition 0 (task 4, attempt 0, stage 3.0)
[Executor task launch worker for task 0.0 in stage 3.0 (TID 4)] INFO org.apache.iceberg.spark.source.SparkCleanupUtil - Deleted 2 file(s) using bulk deletes (partition 0 (task 4, attempt 0, stage 3.0))
...
[Test worker] ERROR org.apache.spark.sql.execution.datasources.v2.AppendDataExec - Data source write support IcebergBatchWrite(table=testhivebulk.default.table, format=PARQUET) is aborting.
[Test worker] INFO org.apache.iceberg.spark.source.SparkCleanupUtil - Deleted 0 file(s) using bulk deletes (job abort)
[Test worker] ERROR org.apache.spark.sql.execution.datasources.v2.AppendDataExec - Data source write support IcebergBatchWrite(table=testhivebulk.default.table, format=PARQUET) aborted.

@github-actions github-actions bot added the spark label Feb 17, 2023
Map<String, String> props = table.properties();
Tasks.foreach(files(messages))
.executeWith(ThreadPools.getWorkerPool())
.retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is reasonable to use commit retry mechanism for deletes. It is the only place we did this. For now, I added some default configs in SparkCleanupUtil. I doubt we want to make it configurable.

: ImmutableList.of()));
}
return ImmutableList.of();
private List<DataFile> files(WriterCommitMessage[] messages) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need a list to know the collection size.

Copy link
Copy Markdown
Member

@szehon-ho szehon-ho Feb 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a little concern about memory, now we are manifesting paths into List, instead of keeping them as Iterable (if they are originally). I see its mostly to log sizes, I wonder if we can't implement a wrapping counter iterable for that?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay either way, it seems like we were previously anyways materializing the WriterCommitMessages which have the files anyways? using s3 as an example, it takes 1 million objects with the worst case key length of 1024 bytes to use 1 GB of memory.

Copy link
Copy Markdown
Member

@szehon-ho szehon-ho Feb 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. actually @amogh-jahagirdar was wondering if you know, is there a reason we dont have the deleteFiles() return number of deleted files? Would be probably be more convenient for callers to log the size that way?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the code to keep a list of files (shouldn't cost anything extra as those files are already there) and switched to using Lists.transform(), which is a lazy transform in SparkCleanupUtil.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho @amogh-jahagirdar, could you take another look?

@aokolnychyi
Copy link
Copy Markdown
Contributor Author

}

// the format matches what Spark uses for internal logging
private static String taskInfo() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: what do you think to move private method to bottom? Breaks the flow of code a bit (would have liked to see deleteFiles right after deleteTaskFiles as its the main delegate)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to group methods by logic instead of access. My reasoning here was that taskInfo() is only invoked in this method is directly related to deleteTaskFiles(). Let me know if that makes sense.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea , its definitely subjective, I prefer personally to see the public methods and their javadocs first to get a high level idea of what the class before diving in to details (especially given there's only two public methods in this class). But as its style preference, I'll leave it optional then.

: ImmutableList.of()));
}
return ImmutableList.of();
private List<DataFile> files(WriterCommitMessage[] messages) {
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho Feb 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a little concern about memory, now we are manifesting paths into List, instead of keeping them as Iterable (if they are originally). I see its mostly to log sizes, I wonder if we can't implement a wrapping counter iterable for that?

Copy link
Copy Markdown
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aokolnychyi great to see this improvement

try {
io.deleteFiles(paths);
LOG.info("Deleted {} file(s) using bulk deletes ({})", paths.size(), context);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: unnecessary newline

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do this sometimes when either the try or catch block are non-trivial to separate them.

: ImmutableList.of()));
}
return ImmutableList.of();
private List<DataFile> files(WriterCommitMessage[] messages) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay either way, it seems like we were previously anyways materializing the WriterCommitMessages which have the files anyways? using s3 as an example, it takes 1 million objects with the worst case key length of 1024 bytes to use 1 GB of memory.

if (cleanupOnAbort) {
SparkCleanupUtil.deletePaths("job abort", table.io(), filePaths(messages));
} else {
LOG.warn("Skipping cleanup of written files, unable to determine the final commit state");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "skipping cleanup of written files" part makes sense to me, but wouldn't "unable to determine the final commit state" apply for both cases (any abort case)? Or are we trying to indicate that we won't be cleaning up any orphan files

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I adapted the original comment but I agree it is a bit weird as the var name is generic and does not say anything about commit state. I changed.

return "unknown task";
} else {
return String.format(
"partition %d (task %d, attempt %d, stage %d.%d)",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we show stage attempID better something like :
Task (id : <TaskID>, attempt : <attemptNumber>), Stage (id : <stageId>, attemp : <attempNumber>)
in place of
(task 0, attempt 0, stage 0.0)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea is to follow the exact format used in Spark so that we can easily match Spark and Iceberg logs.

[Executor task launch worker for task 0.0 in stage 0.0 (TID 0)] ERROR org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Aborting commit for partition 0 (task 0, attempt 0, stage 0.0)
[Executor task launch worker for task 0.0 in stage 0.0 (TID 0)] INFO org.apache.iceberg.spark.source.SparkCleanupUtil - Deleted 2 file(s) (partition 0 (task 0, attempt 0, stage 0.0))

In this example, it is clear that these two records belong to the same context, even though they were produced by Spark and Iceberg. If we change the format, it won't be obvious.

@aokolnychyi aokolnychyi force-pushed the improve-abort branch 2 times, most recently from f669b92 to b73396c Compare February 21, 2023 22:13
@aokolnychyi
Copy link
Copy Markdown
Contributor Author

@szehon-ho @amogh-jahagirdar @singhpk234, could you take another look?

Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me , small comments for consideration

}

// the format matches what Spark uses for internal logging
private static String taskInfo() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea , its definitely subjective, I prefer personally to see the public methods and their javadocs first to get a high level idea of what the class before diving in to details (especially given there's only two public methods in this class). But as its style preference, I'll leave it optional then.

/**
* Attempts to delete as many given files as possible.
*
* @param context a helpful description of the context in which this method is invoked
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Thanks for javadoc, how about 'a helpful description of the operation invoking this method' (to avoid re-using context to define itself)? Not sure its completely accurate though.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it, let me change.

@aokolnychyi aokolnychyi merged commit 3efaee1 into apache:master Feb 23, 2023
@aokolnychyi
Copy link
Copy Markdown
Contributor Author

Thanks for reviewing, @szehon-ho @singhpk234 @amogh-jahagirdar!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants